import builtins
import datetime
from collections.abc import AsyncIterator
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Concatenate, Literal, TypeVar

from daft.dataframe.display import MermaidOptions
from daft.io import DataSink
from daft.io.scan import ScanOperator
from daft.io.sink import WriteResultType
from daft.runners.flotilla import RaySwordfishActorHandle
from daft.runners.partitioning import PartitionCacheEntry, PartitionT
from daft.sql.sql_connection import SQLConnection
from daft.udf.legacy import UDF, BoundUDFArgs, InitArgsType, UninitializedUdf

if TYPE_CHECKING:
    import pyarrow as pa
    import ray
    from pyiceberg.schema import Schema as IcebergSchema
    from pyiceberg.table import TableProperties as IcebergTableProperties

    from daft.ai import Provider
    from daft.catalog import Catalog, Table
    from daft.expressions.visitor import ExpressionVisitor
    from daft.runners.runner import Runner
    from daft.subscribers import Subscriber
    from daft.udf.udf_v2 import ClsBase

R = TypeVar("R")

class ImageMode(Enum):
    """Supported image modes for Daft's image type.

    Warning:
        Currently, only the 8-bit modes (L, LA, RGB, RGBA) can be stored in a DataFrame.
        If your binary image data includes other modes, use the `mode` argument
        in `image.decode` to convert the images to a supported mode.
    """

    #: 8-bit grayscale
    L = 1

    #: 8-bit grayscale + alpha
    LA = 2

    #: 8-bit RGB
    RGB = 3

    #: 8-bit RGB + alpha
    RGBA = 4

    #: 16-bit grayscale
    L16 = 5

    #: 16-bit grayscale + alpha
    LA16 = 6

    #: 16-bit RGB
    RGB16 = 7

    #: 16-bit RGB + alpha
    RGBA16 = 8

    #: 32-bit floating RGB
    RGB32F = 9

    #: 32-bit floating RGB + alpha
    RGBA32F = 10

    @staticmethod
    def from_mode_string(mode: str) -> ImageMode:
        """Create an ImageMode from its string representation.

        Args:
            mode: String representation of the mode. This is the same as the enum
                attribute name, e.g. ``ImageMode.from_mode_string("RGB")`` would
                return ``ImageMode.RGB``.
        """
        ...

class ImageProperty(Enum):
    """Supported image properties for Daft's image type."""

    Height = 1
    Width = 2
    Channel = 3
    Mode = 4

    @staticmethod
    def from_property_string(attr: str) -> ImageProperty: ...

class PyWindowBoundary:
    """Represents a window frame boundary in window functions."""

    @staticmethod
    def unbounded_preceding() -> PyWindowBoundary: ...
    @staticmethod
    def unbounded_following() -> PyWindowBoundary: ...
    @staticmethod
    def offset(n: int) -> PyWindowBoundary: ...
    @staticmethod
    def range_offset(n: PyExpr) -> PyWindowBoundary: ...

class WindowFrameType(Enum):
    """Represents the type of window frame (ROWS or RANGE)."""

    Rows = 1
    Range = 2

class WindowFrame:
    """Represents a window frame specification."""

    def __init__(
        self,
        start: PyWindowBoundary,
        end: PyWindowBoundary,
    ) -> None: ...

class WindowSpec:
    """Represents a window specification for window functions."""

    @staticmethod
    def new() -> WindowSpec: ...
    def with_partition_by(self, exprs: list[PyExpr]) -> WindowSpec: ...
    def with_order_by(self, exprs: list[PyExpr], ascending: list[bool], nulls_first: list[bool]) -> WindowSpec: ...
    def with_frame(self, frame: WindowFrame) -> WindowSpec: ...
    def with_min_periods(self, min_periods: int) -> WindowSpec: ...

class ImageFormat(Enum):
    """Supported image formats for Daft's image I/O."""

    PNG = 1
    JPEG = 2
    TIFF = 3
    GIF = 4
    BMP = 5

    @staticmethod
    def from_format_string(mode: str) -> ImageFormat:
        """Create an ImageFormat from its string representation."""
        ...

class JoinType(Enum):
    """Type of a join operation."""

    Inner = 1
    Left = 2
    Right = 3
    Outer = 4
    Semi = 5
    Anti = 6

    @staticmethod
    def from_join_type_str(join_type: str) -> JoinType:
        """Create a JoinType from its string representation.

        Args:
            join_type: String representation of the join type. This is the same as the enum
                attribute name (but snake-case), e.g. ``JoinType.from_join_type_str("inner")`` would
                return ``JoinType.Inner``.
        """
        ...

class JoinStrategy(Enum):
    """Join strategy (algorithm) to use."""

    Hash = 1
    SortMerge = 2
    Broadcast = 3

    @staticmethod
    def from_join_strategy_str(join_strategy: str) -> JoinStrategy:
        """Create a JoinStrategy from its string representation.

        Args:
            join_strategy: String representation of the join strategy. This is the same as the enum
                attribute name (but snake-case), e.g. ``JoinType.from_join_strategy_str("sort_merge")`` would
                return ``JoinStrategy.SortMerge``.
        """
        ...

class JoinSide(Enum):
    Left = 1
    Right = 2

class CountMode(Enum):
    """Supported count modes for Daft's count aggregation.

    | All   - Count both non-null and null values.
    | Valid - Count only valid values.
    | Null  - Count only null values.
    """

    All = 1
    Valid = 2
    Null = 3

    @staticmethod
    def from_count_mode_str(count_mode: str) -> CountMode:
        """Create a CountMode from its string representation.

        Args:
            count_mode: String representation of the count mode , e.g. "all", "valid", or "null".
        """
        ...

class ResourceRequest:
    """Resource request for a query fragment task."""

    num_cpus: float | None
    num_gpus: float | None
    memory_bytes: int | None

    def __init__(
        self,
        num_cpus: float | None = None,
        num_gpus: float | None = None,
        memory_bytes: int | None = None,
    ): ...
    @staticmethod
    def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest:
        """Take a field-wise max of the list of resource requests."""
        ...

    def with_num_cpus(self, num_cpus: float | None) -> ResourceRequest: ...
    def with_num_gpus(self, num_gpus: float | None) -> ResourceRequest: ...
    def with_memory_bytes(self, memory_bytes: int | None) -> ResourceRequest: ...
    def __mul__(self, factor: float) -> ResourceRequest: ...
    def __add__(self, other: ResourceRequest) -> ResourceRequest: ...
    def __repr__(self) -> str: ...
    def __eq__(self, other: ResourceRequest) -> bool: ...  # type: ignore[override]
    def __ne__(self, other: ResourceRequest) -> bool: ...  # type: ignore[override]

class FileFormat(Enum):
    """Format of a file, e.g. Parquet, CSV, and JSON."""

    Parquet = 1
    Csv = 2
    Json = 3

    def ext(self) -> str: ...

class WriteMode(Enum):
    """Mode for writing data to a file."""

    Overwrite = 1
    OverwritePartitions = 2
    Append = 3

    @staticmethod
    def from_str(mode: str) -> WriteMode: ...

class ParquetSourceConfig:
    """Configuration of a Parquet data source."""

    coerce_int96_timestamp_unit: PyTimeUnit | None
    field_id_mapping: dict[int, PyField] | None
    row_groups: list[list[int]] | None
    chunk_size: int | None

    def __init__(
        self,
        coerce_int96_timestamp_unit: PyTimeUnit | None = None,
        field_id_mapping: dict[int, PyField] | None = None,
        row_groups: list[list[int]] | None = None,
        chunk_size: int | None = None,
    ): ...

class CsvSourceConfig:
    """Configuration of a CSV data source."""

    delimiter: str | None
    has_headers: bool
    double_quote: bool
    quote: str | None
    escape_char: str | None
    comment: str | None
    allow_variable_columns: bool
    buffer_size: int | None
    chunk_size: int | None

    def __init__(
        self,
        has_headers: bool,
        double_quote: bool,
        allow_variable_columns: bool,
        delimiter: str | None,
        quote: str | None,
        escape_char: str | None,
        comment: str | None,
        buffer_size: int | None = None,
        chunk_size: int | None = None,
    ): ...

class JsonSourceConfig:
    """Configuration of a JSON data source."""

    buffer_size: int | None
    chunk_size: int | None

    def __init__(
        self,
        buffer_size: int | None = None,
        chunk_size: int | None = None,
    ): ...

class WarcSourceConfig:
    """Configuration of a Warc data source."""

    def __init__(self) -> None: ...

class DatabaseSourceConfig:
    """Configuration of a database data source."""

    sql: str
    conn: SQLConnection

    def __init__(self, sql: str, conn_factory: SQLConnection): ...

class FileFormatConfig:
    """Configuration for parsing a particular file format (Parquet, CSV, JSON)."""

    config: ParquetSourceConfig | CsvSourceConfig | JsonSourceConfig | DatabaseSourceConfig | WarcSourceConfig

    @staticmethod
    def from_parquet_config(config: ParquetSourceConfig) -> FileFormatConfig:
        """Create a Parquet file format config."""
        ...

    @staticmethod
    def from_csv_config(config: CsvSourceConfig) -> FileFormatConfig:
        """Create a CSV file format config."""
        ...

    @staticmethod
    def from_json_config(config: JsonSourceConfig) -> FileFormatConfig:
        """Create a JSON file format config."""
        ...

    @staticmethod
    def from_warc_config(config: WarcSourceConfig) -> FileFormatConfig:
        """Create a WARC file format config."""
        ...

    @staticmethod
    def from_database_config(config: DatabaseSourceConfig) -> FileFormatConfig:
        """Create a database file format config."""
        ...

    def file_format(self) -> FileFormat:
        """Get the file format for this config."""
        ...

    def __eq__(self, other: FileFormatConfig) -> bool: ...  # type: ignore[override]
    def __ne__(self, other: FileFormatConfig) -> bool: ...  # type: ignore[override]

class CsvConvertOptions:
    """Options for converting CSV data to Daft data."""

    limit: int | None
    include_columns: list[str] | None
    column_names: list[str] | None
    schema: PySchema | None
    predicate: PyExpr | None

    def __init__(
        self,
        limit: int | None = None,
        include_columns: list[str] | None = None,
        column_names: list[str] | None = None,
        schema: PySchema | None = None,
        predicate: PyExpr | None = None,
    ): ...

class CsvParseOptions:
    """Options for parsing CSV files."""

    has_header: bool
    delimiter: str | None
    double_quote: bool
    quote: str | None
    allow_variable_columns: bool
    escape_char: str | None
    comment: str | None

    def __init__(
        self,
        has_header: bool = True,
        delimiter: str | None = None,
        double_quote: bool = True,
        quote: str | None = None,
        allow_variable_columns: bool = False,
        escape_char: str | None = None,
        comment: str | None = None,
    ): ...

class CsvReadOptions:
    """Options for reading CSV files."""

    buffer_size: int | None
    chunk_size: int | None

    def __init__(
        self,
        buffer_size: int | None = None,
        chunk_size: int | None = None,
    ): ...

class JsonConvertOptions:
    """Options for converting JSON data to Daft data."""

    limit: int | None
    include_columns: list[str] | None
    schema: PySchema | None

    def __init__(
        self,
        limit: int | None = None,
        include_columns: list[str] | None = None,
        schema: PySchema | None = None,
    ): ...

class JsonParseOptions:
    """Options for parsing JSON files."""

class JsonReadOptions:
    """Options for reading JSON files."""

    buffer_size: int | None
    chunk_size: int | None

    def __init__(
        self,
        buffer_size: int | None = None,
        chunk_size: int | None = None,
    ): ...

class FileInfo:
    """Metadata for a single file."""

    file_path: str
    file_size: int | None
    num_rows: int | None

class FileInfos:
    """Metadata for a collection of files."""

    file_paths: list[str]
    file_sizes: list[int | None]
    num_rows: list[int | None]

    @staticmethod
    def from_infos(file_paths: list[str], file_sizes: list[int | None], num_rows: list[int | None]) -> FileInfos: ...
    def merge(self, new_infos: FileInfos) -> FileInfos:
        """Merge two FileInfos together."""
        ...

    def __getitem__(self, idx: int) -> FileInfo: ...
    def __len__(self) -> int: ...

class HTTPConfig:
    """I/O configuration for accessing HTTP systems."""

    bearer_token: str | None
    retry_initial_backoff_ms: int
    connect_timeout_ms: int
    read_timeout_ms: int
    num_tries: int

    def __init__(
        self,
        bearer_token: str | None = None,
        retry_initial_backoff_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        num_tries: int | None = None,
    ): ...

class S3Config:
    """I/O configuration for accessing an S3-compatible system.

    Args:
        region_name (str, optional): Name of the region to be used (used when accessing AWS S3), defaults to "us-east-1".
            If wrongly provided, Daft will attempt to auto-detect the buckets' region at the cost of extra S3 requests.
        endpoint_url (str, optional): URL to the S3 endpoint, defaults to endpoints to AWS
        key_id (str, optional): AWS Access Key ID, defaults to auto-detection from the current environment
        access_key (str, optional): AWS Secret Access Key, defaults to auto-detection from the current environment
        credentials_provider (Callable[[], S3Credentials], optional): Custom credentials provider function, should return a `S3Credentials` object
        buffer_time (int, optional): Amount of time in seconds before the actual credential expiration time where credentials given by `credentials_provider` are considered expired, defaults to 10s
        max_connections (int, optional): Maximum number of connections to S3 at any time per io thread, defaults to 8
        session_token (str, optional): AWS Session Token, required only if `key_id` and `access_key` are temporary credentials
        retry_initial_backoff_ms (int, optional): Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms
        connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 30 seconds
        read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 30 seconds
        num_tries (int, optional): Number of attempts to make a connection, defaults to 25
        retry_mode (str, optional): Retry Mode when a request fails, current supported values are `standard` and `adaptive`, defaults to `adaptive`
        anonymous (bool, optional): Whether or not to use "anonymous mode", which will access S3 without any credentials
        use_ssl (bool, optional): Whether or not to use SSL, which require accessing S3 over HTTPS rather than HTTP, defaults to True
        verify_ssl (bool, optional): Whether or not to verify ssl certificates, which will access S3 without checking if the certs are valid, defaults to True
        check_hostname_ssl (bool, optional): Whether or not to verify the hostname when verifying ssl certificates, this was the legacy behavior for openssl, defaults to True
        requester_pays (bool, optional): Whether or not the authenticated user will assume transfer costs, which is required by some providers of bulk data, defaults to False
        force_virtual_addressing (bool, optional): Force S3 client to use virtual addressing in all cases. If False, virtual addressing will only be used if `endpoint_url` is empty, defaults to False
        profile_name (str, optional): Name of AWS_PROFILE to load, defaults to None which will then check the Environment Variable `AWS_PROFILE` then fall back to `default`
        multipart_size (int, optional): The size of multipart part (bytes), the size range should be 5MB to 5GB, defaults to 8MB.
        multipart_max_concurrency (int, optional): The max concurrency of upload part per object, defaults to 100.
        custom_retry_msgs (list[str], optional): Will retry the request if any custom retry message appeared in the error message of response, defaults to None.

    Examples:
        >>> # For AWS S3
        >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx"))
        >>> daft.read_parquet("s3://some-path", io_config=io_config)

        >>> # For S3-compatible services (e.g. Volcengine TOS)
        >>> io_config = IOConfig(
        ...     s3=S3Config(
        ...         endpoint_url="https://tos-s3-{region}.ivolces.com",
        ...         region_name="{region}",
        ...         force_virtual_addressing=True,
        ...         verify_ssl=True,
        ...         key_id="your-access-key-id",
        ...         access_key="your-secret-access-key",
        ...     )
        ... )
        >>> daft.read_parquet("s3://some-path", io_config=io_config)
    """

    region_name: str | None
    endpoint_url: str | None
    key_id: str | None
    session_token: str | None
    access_key: str | None
    credentials_provider: Callable[[], S3Credentials] | None
    max_connections: int
    retry_initial_backoff_ms: int
    connect_timeout_ms: int
    read_timeout_ms: int
    num_tries: int
    retry_mode: str | None
    anonymous: bool
    use_ssl: bool
    verify_ssl: bool
    check_hostname_ssl: bool
    requester_pays: bool | None
    force_virtual_addressing: bool | None
    profile_name: str | None
    multipart_size: int | None
    multipart_max_concurrency: int | None
    custom_retry_msgs: list[str] | None

    def __init__(
        self,
        region_name: str | None = None,
        endpoint_url: str | None = None,
        key_id: str | None = None,
        session_token: str | None = None,
        access_key: str | None = None,
        credentials_provider: Callable[[], S3Credentials] | None = None,
        buffer_time: int | None = None,
        max_connections: int | None = None,
        retry_initial_backoff_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        num_tries: int | None = None,
        retry_mode: str | None = None,
        anonymous: bool | None = None,
        use_ssl: bool | None = None,
        verify_ssl: bool | None = None,
        check_hostname_ssl: bool | None = None,
        requester_pays: bool | None = None,
        force_virtual_addressing: bool | None = None,
        profile_name: str | None = None,
        multipart_size: int | None = None,
        multipart_max_concurrency: int | None = None,
        custom_retry_msgs: list[str] | None = None,
    ): ...
    def replace(
        self,
        region_name: str | None = None,
        endpoint_url: str | None = None,
        key_id: str | None = None,
        session_token: str | None = None,
        access_key: str | None = None,
        credentials_provider: Callable[[], S3Credentials] | None = None,
        max_connections: int | None = None,
        retry_initial_backoff_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        num_tries: int | None = None,
        retry_mode: str | None = None,
        anonymous: bool | None = None,
        use_ssl: bool | None = None,
        verify_ssl: bool | None = None,
        check_hostname_ssl: bool | None = None,
        requester_pays: bool | None = None,
        force_virtual_addressing: bool | None = None,
        profile_name: str | None = None,
        multipart_size: int | None = None,
        multipart_max_concurrency: int | None = None,
        custom_retry_msgs: list[str] | None = None,
    ) -> S3Config:
        """Replaces values if provided, returning a new S3Config."""
        ...

    @staticmethod
    def from_env() -> S3Config:
        """Creates an S3Config, retrieving credentials and configurations from the current environment."""
        ...

    def provide_cached_credentials(self) -> S3Credentials | None:
        """Wrapper around call to `S3Config.credentials_provider` to cache credentials until expiry."""
        ...

class S3Credentials:
    key_id: str
    access_key: str
    session_token: str | None
    expiry: datetime.datetime | None

    def __init__(
        self,
        key_id: str,
        access_key: str,
        session_token: str | None = None,
        expiry: datetime.datetime | None = None,
    ): ...

class AzureConfig:
    """I/O configuration for accessing Azure Blob Storage."""

    storage_account: str | None
    access_key: str | None
    sas_token: str | None
    bearer_token: str | None
    tenant_id: str | None
    client_id: str | None
    client_secret: str | None
    use_fabric_endpoint: bool | None
    anonymous: bool | None
    endpoint_url: str | None = None
    use_ssl: bool | None = None

    def __init__(
        self,
        storage_account: str | None = None,
        access_key: str | None = None,
        sas_token: str | None = None,
        bearer_token: str | None = None,
        tenant_id: str | None = None,
        client_id: str | None = None,
        client_secret: str | None = None,
        use_fabric_endpoint: bool | None = None,
        anonymous: bool | None = None,
        endpoint_url: str | None = None,
        use_ssl: bool | None = None,
    ): ...
    def replace(
        self,
        storage_account: str | None = None,
        access_key: str | None = None,
        sas_token: str | None = None,
        bearer_token: str | None = None,
        tenant_id: str | None = None,
        client_id: str | None = None,
        client_secret: str | None = None,
        use_fabric_endpoint: bool | None = None,
        anonymous: bool | None = None,
        endpoint_url: str | None = None,
        use_ssl: bool | None = None,
    ) -> AzureConfig:
        """Replaces values if provided, returning a new AzureConfig."""
        ...

class GCSConfig:
    """I/O configuration for accessing Google Cloud Storage."""

    project_id: str | None
    credentials: str | None
    token: str | None
    anonymous: bool
    max_connections: int
    retry_initial_backoff_ms: int
    connect_timeout_ms: int
    read_timeout_ms: int
    num_tries: int

    def __init__(
        self,
        project_id: str | None = None,
        credentials: str | None = None,
        token: str | None = None,
        anonymous: bool | None = None,
        max_connections: int | None = None,
        retry_initial_backoff_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        num_tries: int | None = None,
    ): ...
    def replace(
        self,
        project_id: str | None = None,
        credentials: str | None = None,
        token: str | None = None,
        anonymous: bool | None = None,
        max_connections: int | None = None,
        retry_initial_backoff_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        num_tries: int | None = None,
    ) -> GCSConfig:
        """Replaces values if provided, returning a new GCSConfig."""
        ...

class UnityConfig:
    """I/O configuration for Unity Catalog volumes."""

    endpoint: str | None
    token: str | None

    def __init__(
        self,
        endpoint: str | None,
        token: str | None,
    ): ...
    def replace(
        self,
        endpoint: str | None,
        token: str | None,
    ) -> UnityConfig:
        """Replaces values if provided, returning a new UnityConfig."""
        ...

class HuggingFaceConfig:
    """I/O configuration for accessing Hugging Face datasets.

    Args:
        token (str, optional): Your Hugging Face access token, generated from https://huggingface.co/settings/tokens.
        anonymous (bool, optional): Whether or not to use "anonymous mode", which will access Hugging Face without any credentials. Defaults to False.
        use_content_defined_chunking (bool, optional): Set the `use_content_defined_chunking` parameter when creating a `pyarrow.parquet.ParquetWriter`. Only available with pyarrow>=21. Defaults to true if available.
        row_group_size (int, optional): Row group size when writing Parquet files. Defaults to the default `pyarrow.parquet.ParquetWriter` row group size.
        target_filesize (int, optional): Target size in bytes for each written Parquet file. Defaults to 512 MB.
        max_operations_per_commit (int, optional): Maximum number of files to add/copy/delete per commit. Defaults to 100.

    """

    token: str | None
    anonymous: bool
    use_content_defined_chunking: bool
    row_group_size: int | None
    target_filesize: int
    max_operations_per_commit: int

    def __init__(
        self,
        token: str | None = None,
        anonymous: bool | None = None,
        use_content_defined_chunking: bool | None = None,
        row_group_size: int | None = None,
        target_filesize: int | None = None,
        max_operations_per_commit: int | None = None,
    ): ...
    def replace(
        self,
        token: str | None = None,
        anonymous: bool | None = None,
        use_content_defined_chunking: bool | None = None,
        row_group_size: int | None = None,
        target_filesize: int | None = None,
        max_operations_per_commit: int | None = None,
    ) -> HuggingFaceConfig:
        """Replaces values if provided, returning a new HuggingFaceConfig."""
        ...

class TosConfig:
    """I/O configuration for accessing Volcengine TOS (Torch Object Storage).

    Args:
        region (str, optional): Name of the region to be used, defaults to None, it can be detected automatically from endpoint if standard endpoint is set.
        endpoint (str, optional): URL to the TOS endpoint, defaults to None for Volcengine TOS, it can be inferred from region.
        access_key (str, optional): TOS Access Key, defaults to None.
        secret_key (str, optional): TOS Secret Key, defaults to None.
        security_token (str, optional): TOS Security Token, required for temporary credentials, defaults to None.
        anonymous (bool, optional): Whether to use "anonymous mode" or not, which will access TOS without any credentials. Defaults to False.
        max_retries (int, optional): Maximum number of retries for failed requests, defaults to 3.
        retry_timeout_ms (int, optional): Timeout duration for retry attempts in milliseconds, defaults to 30000ms.
        connect_timeout_ms (int, optional): Timeout duration to wait to make a connection to TOS in milliseconds, defaults to 10000ms.
        read_timeout_ms (int, optional): Timeout duration to wait to read the first byte from TOS in milliseconds, defaults to 30000ms.
        max_concurrent_requests (int, optional): Maximum number of concurrent requests to TOS at any time, defaults to 50.
        max_connections_per_io_thread (int, optional): Maximum number of connections to TOS per IO thread, defaults to 50.

    Examples:
        >>> # For Volcengine & byteplus TOS, refer to https://www.volcengine.com/docs/6349/107356
        >>> # or https://docs.byteplus.com/en/docs/tos/docs-region-and-endpoint or get endpoint and region info.
        >>>
        >>> io_config = IOConfig(
        ...     tos=TosConfig(
        ...         region="cn-beijing",
        ...         endpoint="https://tos-cn-beijing.volces.com",
        ...         access_key="your-access-key",
        ...         secret_key="your-secret-key",
        ...     )
        ... )
        >>> daft.read_parquet("tos://some-path", io_config=io_config)
    """

    region: str | None
    endpoint: str | None
    access_key: str | None
    secret_key: str | None
    security_token: str | None
    anonymous: bool
    max_retries: int
    retry_timeout_ms: int
    connect_timeout_ms: int
    read_timeout_ms: int
    max_concurrent_requests: int
    max_connections_per_io_thread: int

    def __init__(
        self,
        region: str | None = None,
        endpoint: str | None = None,
        access_key: str | None = None,
        secret_key: str | None = None,
        security_token: str | None = None,
        anonymous: bool | None = None,
        max_retries: int | None = None,
        retry_timeout_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        max_concurrent_requests: int | None = None,
        max_connections_per_io_thread: int | None = None,
    ): ...
    def replace(
        self,
        region: str | None = None,
        endpoint: str | None = None,
        access_key: str | None = None,
        secret_key: str | None = None,
        security_token: str | None = None,
        anonymous: bool | None = None,
        max_retries: int | None = None,
        retry_timeout_ms: int | None = None,
        connect_timeout_ms: int | None = None,
        read_timeout_ms: int | None = None,
        max_concurrent_requests: int | None = None,
        max_connections_per_io_thread: int | None = None,
    ) -> TosConfig:
        """Replaces values if provided, returning a new TosConfig."""
        ...
    @staticmethod
    def from_env() -> TosConfig:
        """Creates a TosConfig, retrieving credentials and configurations from the current environment.

        TOS_ENDPOINT: Endpoint of the TOS service.
        TOS_REGION: Region of the TOS service.
        TOS_ACCESS_KEY: Access key for TOS authentication.
        TOS_SECRET_KEY: Secret key for TOS authentication.
        TOS_SECURITY_TOKEN: Security token for TOS authentication.
        """

class IOConfig:
    """Configuration for the native I/O layer, e.g. credentials for accessing cloud storage systems."""

    s3: S3Config
    azure: AzureConfig
    gcs: GCSConfig
    http: HTTPConfig
    unity: UnityConfig
    hf: HuggingFaceConfig
    disable_suffix_range: bool
    tos: TosConfig

    def __init__(
        self,
        s3: S3Config | None = None,
        azure: AzureConfig | None = None,
        gcs: GCSConfig | None = None,
        http: HTTPConfig | None = None,
        unity: UnityConfig | None = None,
        hf: HuggingFaceConfig | None = None,
        disable_suffix_range: bool | None = None,
        tos: TosConfig | None = None,
    ): ...
    def replace(
        self,
        s3: S3Config | None = None,
        azure: AzureConfig | None = None,
        gcs: GCSConfig | None = None,
        http: HTTPConfig | None = None,
        unity: UnityConfig | None = None,
        hf: HuggingFaceConfig | None = None,
        disable_suffix_range: bool | None = None,
        tos: TosConfig | None = None,
    ) -> IOConfig:
        """Replaces values if provided, returning a new IOConfig."""
        ...

class StorageConfig:
    """Configuration for interacting with a particular storage backend."""

    # Whether or not to use a multithreaded tokio runtime for processing I/O
    multithreaded_io: bool
    io_config: IOConfig

    def __init__(self, multithreaded_io: bool, io_config: IOConfig | None): ...

class ScanTask:
    """A batch of scan tasks for reading data from an external source."""

    def num_rows(self) -> int:
        """Get number of rows that will be scanned by this ScanTask."""
        ...

    def estimate_in_memory_size_bytes(self, cfg: PyDaftExecutionConfig) -> int:
        """Estimate the In Memory Size of this ScanTask."""
        ...

    @staticmethod
    def catalog_scan_task(
        file: str,
        file_format: FileFormatConfig,
        schema: PySchema,
        storage_config: StorageConfig,
        num_rows: int | None,
        size_bytes: int | None,
        iceberg_delete_files: list[str] | None,
        pushdowns: PyPushdowns | None,
        partition_values: PyRecordBatch | None,
        stats: PyRecordBatch | None,
    ) -> ScanTask | None:
        """Create a Catalog Scan Task."""
        ...

    @staticmethod
    def sql_scan_task(
        url: str,
        file_format: FileFormatConfig,
        schema: PySchema,
        storage_config: StorageConfig,
        num_rows: int | None,
        size_bytes: int | None,
        pushdowns: PyPushdowns | None,
        stats: PyRecordBatch | None,
    ) -> ScanTask:
        """Create a SQL Scan Task."""
        ...

    @staticmethod
    def python_factory_func_scan_task(
        module: str,
        func_name: str,
        func_args: tuple[Any, ...],
        schema: PySchema,
        num_rows: int | None,
        size_bytes: int | None,
        pushdowns: PyPushdowns | None,
        stats: PyRecordBatch | None,
        source_type: str | None = None,
    ) -> ScanTask:
        """Create a Python factory function Scan Task."""
        ...

class ScanOperatorHandle:
    """A handle to a scan operator."""

    @staticmethod
    def anonymous_scan(
        files: list[str],
        schema: PySchema,
        file_format_config: FileFormatConfig,
        storage_config: StorageConfig,
    ) -> ScanOperatorHandle: ...
    @staticmethod
    def glob_scan(
        glob_path: list[str],
        file_format_config: FileFormatConfig,
        storage_config: StorageConfig,
        hive_partitioning: bool,
        infer_schema: bool,
        schema: PySchema | None = None,
        file_path_column: str | None = None,
        skip_glob: bool = False,
    ) -> ScanOperatorHandle: ...
    @staticmethod
    def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...

def logical_plan_table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...

class PyPartitionField:
    """Partitioning Field of a Scan Source such as Hive or Iceberg."""

    field: PyField
    source_field: PyField | None = None
    transform: PyPartitionTransform | None = None

    def __init__(
        self,
        field: PyField,
        source_field: PyField | None = None,
        transform: PyPartitionTransform | None = None,
    ) -> None: ...

class PyPartitionTransform:
    """Partitioning Transform from a Data Catalog source field to a Partitioning Columns."""

    @staticmethod
    def identity() -> PyPartitionTransform: ...
    @staticmethod
    def year() -> PyPartitionTransform: ...
    @staticmethod
    def month() -> PyPartitionTransform: ...
    @staticmethod
    def day() -> PyPartitionTransform: ...
    @staticmethod
    def hour() -> PyPartitionTransform: ...
    @staticmethod
    def iceberg_bucket(n: int) -> PyPartitionTransform: ...
    @staticmethod
    def iceberg_truncate(w: int) -> PyPartitionTransform: ...
    def is_identity(self) -> bool: ...
    def is_year(self) -> bool: ...
    def is_month(self) -> bool: ...
    def is_day(self) -> bool: ...
    def is_hour(self) -> bool: ...
    def is_iceberg_bucket(self) -> bool: ...
    def is_iceberg_truncate(self) -> bool: ...
    def num_buckets(self) -> int: ...
    def width(self) -> int: ...

class PyPushdowns:
    """Pushdowns from the query optimizer that can optimize scanning data sources."""

    columns: list[str] | None
    filters: PyExpr | None
    partition_filters: PyExpr | None
    limit: int | None
    aggregation: PyExpr | None

    def __init__(
        self,
        columns: list[str] | None = None,
        filters: PyExpr | None = None,
        partition_filters: PyExpr | None = None,
        limit: int | None = None,
        aggregation: PyExpr | None = None,
    ) -> None: ...
    def filter_required_column_names(self) -> list[str]:
        """List of field names that are required by the filter predicate."""
        ...

    def aggregation_required_column_names(self) -> list[str]:
        """List of field names that are required by the aggregation predicate."""
        ...

    def aggregation_count_mode(self) -> CountMode:
        """Count mode of the aggregation predicate."""
        ...

PyArrowParquetType = tuple[pa.Field, dict[str, str], pa.Array, int]

def read_parquet(
    uri: str,
    columns: list[str] | None = None,
    start_offset: int | None = None,
    num_rows: int | None = None,
    row_groups: list[int] | None = None,
    predicate: PyExpr | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
    coerce_int96_timestamp_unit: PyTimeUnit | None = None,
) -> PyRecordBatch: ...
def read_parquet_statistics(
    uris: PySeries,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
) -> PyRecordBatch: ...
def read_parquet_into_pyarrow(
    uri: str,
    columns: list[str] | None = None,
    start_offset: int | None = None,
    num_rows: int | None = None,
    row_groups: list[int] | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
    coerce_int96_timestamp_unit: PyTimeUnit | None = None,
    string_encoding: Literal["utf-8", "raw"] = "utf-8",
    file_timeout_ms: int | None = None,
) -> PyArrowParquetType: ...
def read_parquet_into_pyarrow_bulk(
    uris: list[str],
    columns: list[str] | None = None,
    start_offset: int | None = None,
    num_rows: int | None = None,
    row_groups: list[list[int] | None] | None = None,
    io_config: IOConfig | None = None,
    num_parallel_tasks: int | None = 128,
    multithreaded_io: bool | None = None,
    coerce_int96_timestamp_unit: PyTimeUnit | None = None,
) -> list[PyArrowParquetType]: ...
def read_parquet_schema(
    uri: str,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
    coerce_int96_timestamp_unit: PyTimeUnit | None = None,
) -> PySchema: ...
def read_csv(
    uri: str,
    convert_options: CsvConvertOptions | None = None,
    parse_options: CsvParseOptions | None = None,
    read_options: CsvReadOptions | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
) -> PyRecordBatch: ...
def read_csv_schema(
    uri: str,
    parse_options: CsvParseOptions | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
) -> PySchema: ...
def read_json(
    uri: str,
    convert_options: JsonConvertOptions | None = None,
    parse_options: JsonParseOptions | None = None,
    read_options: JsonReadOptions | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
    max_chunks_in_flight: int | None = None,
) -> PyRecordBatch: ...
def read_json_schema(
    uri: str,
    parse_options: JsonParseOptions | None = None,
    io_config: IOConfig | None = None,
    multithreaded_io: bool | None = None,
) -> PySchema: ...

class PyTimeUnit:
    @staticmethod
    def nanoseconds() -> PyTimeUnit: ...
    @staticmethod
    def microseconds() -> PyTimeUnit: ...
    @staticmethod
    def milliseconds() -> PyTimeUnit: ...
    @staticmethod
    def seconds() -> PyTimeUnit: ...
    @staticmethod
    def from_str(unit: str) -> PyTimeUnit: ...

ReduceReturnType = tuple[Callable[..., Any], tuple[Any, ...]]

class PyDataType:
    @staticmethod
    def null() -> PyDataType: ...
    @staticmethod
    def bool() -> PyDataType: ...
    @staticmethod
    def int8() -> PyDataType: ...
    @staticmethod
    def int16() -> PyDataType: ...
    @staticmethod
    def int32() -> PyDataType: ...
    @staticmethod
    def int64() -> PyDataType: ...
    @staticmethod
    def uint8() -> PyDataType: ...
    @staticmethod
    def uint16() -> PyDataType: ...
    @staticmethod
    def uint32() -> PyDataType: ...
    @staticmethod
    def uint64() -> PyDataType: ...
    @staticmethod
    def float32() -> PyDataType: ...
    @staticmethod
    def float64() -> PyDataType: ...
    @staticmethod
    def binary() -> PyDataType: ...
    @staticmethod
    def fixed_size_binary(size: int) -> PyDataType: ...
    @staticmethod
    def string() -> PyDataType: ...
    @staticmethod
    def decimal128(precision: int, size: int) -> PyDataType: ...
    @staticmethod
    def date() -> PyDataType: ...
    @staticmethod
    def time(time_unit: PyTimeUnit) -> PyDataType: ...
    @staticmethod
    def timestamp(time_unit: PyTimeUnit, timezone: str | None = None) -> PyDataType: ...
    @staticmethod
    def duration(time_unit: PyTimeUnit) -> PyDataType: ...
    @staticmethod
    def interval() -> PyDataType: ...
    @staticmethod
    def list(data_type: PyDataType) -> PyDataType: ...
    @staticmethod
    def fixed_size_list(data_type: PyDataType, size: int) -> PyDataType: ...
    @staticmethod
    def map(key_type: PyDataType, value_type: PyDataType) -> PyDataType: ...
    @staticmethod
    def struct(fields: dict[str, PyDataType]) -> PyDataType: ...
    @staticmethod
    def extension(name: str, storage_data_type: PyDataType, metadata: str | None = None) -> PyDataType: ...
    @staticmethod
    def embedding(data_type: PyDataType, size: int) -> PyDataType: ...
    @staticmethod
    def image(
        mode: ImageMode | None = None,
        height: int | None = None,
        width: int | None = None,
    ) -> PyDataType: ...
    @staticmethod
    def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ...
    @staticmethod
    def sparse_tensor(
        dtype: PyDataType, shape: tuple[int, ...] | None = None, use_offset_indices: builtins.bool = False
    ) -> PyDataType: ...
    @staticmethod
    def python() -> PyDataType: ...
    @staticmethod
    def file(media_type: PyMediaType) -> PyDataType: ...
    def to_arrow(self, cast_tensor_type_for_ray: builtins.bool | None = None) -> pa.DataType: ...
    def is_null(self) -> builtins.bool: ...
    def is_boolean(self) -> builtins.bool: ...
    def is_int8(self) -> builtins.bool: ...
    def is_int16(self) -> builtins.bool: ...
    def is_int32(self) -> builtins.bool: ...
    def is_int64(self) -> builtins.bool: ...
    def is_uint8(self) -> builtins.bool: ...
    def is_uint16(self) -> builtins.bool: ...
    def is_uint32(self) -> builtins.bool: ...
    def is_uint64(self) -> builtins.bool: ...
    def is_float32(self) -> builtins.bool: ...
    def is_float64(self) -> builtins.bool: ...
    def is_decimal128(self) -> builtins.bool: ...
    def is_timestamp(self) -> builtins.bool: ...
    def is_date(self) -> builtins.bool: ...
    def is_time(self) -> builtins.bool: ...
    def is_duration(self) -> builtins.bool: ...
    def is_interval(self) -> builtins.bool: ...
    def is_binary(self) -> builtins.bool: ...
    def is_fixed_size_binary(self) -> builtins.bool: ...
    def is_string(self) -> builtins.bool: ...
    def is_list(self) -> builtins.bool: ...
    def is_fixed_size_list(self) -> builtins.bool: ...
    def is_struct(self) -> builtins.bool: ...
    def is_map(self) -> builtins.bool: ...
    def is_extension(self) -> builtins.bool: ...
    def is_image(self) -> builtins.bool: ...
    def is_fixed_shape_image(self) -> builtins.bool: ...
    def is_embedding(self) -> builtins.bool: ...
    def is_tensor(self) -> builtins.bool: ...
    def is_fixed_shape_tensor(self) -> builtins.bool: ...
    def is_sparse_tensor(self) -> builtins.bool: ...
    def is_fixed_shape_sparse_tensor(self) -> builtins.bool: ...
    def is_python(self) -> builtins.bool: ...
    def is_numeric(self) -> builtins.bool: ...
    def is_integer(self) -> builtins.bool: ...
    def is_logical(self) -> builtins.bool: ...
    def is_temporal(self) -> builtins.bool: ...
    def is_file(self) -> builtins.bool: ...
    def fixed_size(self) -> int: ...
    def fixed_shape(self) -> tuple[int, ...]: ...
    def time_unit(self) -> PyTimeUnit: ...
    def time_zone(self) -> str | None: ...
    def dtype(self) -> PyDataType: ...
    def fields(self) -> builtins.list[PyField]: ...
    def precision(self) -> int: ...
    def scale(self) -> int: ...
    def image_mode(self) -> ImageMode | None: ...
    def use_offset_indices(self) -> builtins.bool: ...
    def key_type(self) -> PyDataType: ...
    def value_type(self) -> PyDataType: ...
    def is_equal(self, other: Any) -> builtins.bool: ...
    @staticmethod
    def from_json(serialized: str) -> PyDataType: ...
    def __reduce__(self) -> ReduceReturnType: ...
    def __hash__(self) -> int: ...

class PyField:
    def name(self) -> str: ...
    @staticmethod
    def create(name: str, datatype: PyDataType, metadata: dict[str, str] | None = None) -> PyField: ...
    def with_metadata(self, metadata: dict[str, str]) -> PyField: ...
    def dtype(self) -> PyDataType: ...
    def eq(self, other: PyField) -> bool: ...
    def __reduce__(self) -> ReduceReturnType: ...

class PySchema:
    def __getitem__(self, name: str) -> PyField: ...
    def names(self) -> list[str]: ...
    def union(self, other: PySchema) -> PySchema: ...
    def __eq__(self, other: object) -> bool: ...
    def estimate_row_size_bytes(self) -> float: ...
    @staticmethod
    def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ...
    @staticmethod
    def from_fields(fields: list[PyField]) -> PySchema: ...
    def to_pyarrow_schema(self) -> pa.Schema: ...
    def __reduce__(self) -> ReduceReturnType: ...
    def __repr__(self) -> str: ...
    def _repr_html_(self) -> str: ...
    def _truncated_table_html(self) -> str: ...
    def _truncated_table_string(self) -> str: ...
    def apply_hints(self, hints: PySchema) -> PySchema: ...
    def display_with_metadata(self, include_metadata: bool = False) -> str: ...
    def min_estimated_size_column(self) -> str | None: ...

class PyExpr:
    def alias(self, name: str) -> PyExpr: ...
    def cast(self, dtype: PyDataType) -> PyExpr: ...
    def if_else(self, if_true: PyExpr, if_false: PyExpr) -> PyExpr: ...
    def count(self, mode: CountMode) -> PyExpr: ...
    def count_distinct(self) -> PyExpr: ...
    def sum(self) -> PyExpr: ...
    def product(self) -> PyExpr: ...
    def approx_count_distinct(self) -> PyExpr: ...
    def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
    def mean(self) -> PyExpr: ...
    def stddev(self) -> PyExpr: ...
    def min(self) -> PyExpr: ...
    def max(self) -> PyExpr: ...
    def bool_and(self) -> PyExpr: ...
    def bool_or(self) -> PyExpr: ...
    def any_value(self, ignore_nulls: bool) -> PyExpr: ...
    def skew(self) -> PyExpr: ...
    def agg_list(self) -> PyExpr: ...
    def agg_set(self) -> PyExpr: ...
    def agg_concat(self) -> PyExpr: ...
    def over(self, window_spec: WindowSpec) -> PyExpr: ...
    def offset(self, offset: int, default: PyExpr | None = None) -> PyExpr: ...
    def __add__(self, other: PyExpr) -> PyExpr: ...
    def __sub__(self, other: PyExpr) -> PyExpr: ...
    def __mul__(self, other: PyExpr) -> PyExpr: ...
    def __floordiv__(self, other: PyExpr) -> PyExpr: ...
    def __truediv__(self, other: PyExpr) -> PyExpr: ...
    def __mod__(self, other: PyExpr) -> PyExpr: ...
    def __and__(self, other: PyExpr) -> PyExpr: ...
    def __or__(self, other: PyExpr) -> PyExpr: ...
    def __xor__(self, other: PyExpr) -> PyExpr: ...
    def __invert__(self) -> PyExpr: ...
    def __lt__(self, other: PyExpr) -> PyExpr: ...
    def __le__(self, other: PyExpr) -> PyExpr: ...
    def __gt__(self, other: PyExpr) -> PyExpr: ...
    def __ge__(self, other: PyExpr) -> PyExpr: ...
    def __eq__(self, other: PyExpr) -> PyExpr: ...  # type: ignore[override]
    def __ne__(self, other: PyExpr) -> PyExpr: ...  # type: ignore[override]
    def __rshift__(self, other: PyExpr) -> PyExpr: ...
    def __lshift__(self, other: PyExpr) -> PyExpr: ...
    def is_null(self) -> PyExpr: ...
    def not_null(self) -> PyExpr: ...
    def fill_null(self, fill_value: PyExpr) -> PyExpr: ...
    def eq_null_safe(self, other: PyExpr) -> PyExpr: ...
    def is_in(self, other: list[PyExpr]) -> PyExpr: ...
    def between(self, lower: PyExpr, upper: PyExpr) -> PyExpr: ...
    def name(self) -> str: ...
    def to_field(self, schema: PySchema) -> PyField: ...
    def to_sql(self) -> str: ...
    def __repr__(self) -> str: ...
    def __hash__(self) -> int: ...
    def __reduce__(self) -> ReduceReturnType: ...
    def struct_get(self, name: str) -> PyExpr: ...
    def map_get(self, key: PyExpr) -> PyExpr: ...
    def partitioning_days(self) -> PyExpr: ...
    def partitioning_hours(self) -> PyExpr: ...
    def partitioning_months(self) -> PyExpr: ...
    def partitioning_years(self) -> PyExpr: ...
    def partitioning_iceberg_bucket(self, n: int) -> PyExpr: ...
    def partitioning_iceberg_truncate(self, w: int) -> PyExpr: ...
    def vllm(
        self,
        model: str,
        concurrency: int,
        gpus_per_actor: int,
        do_prefix_routing: bool,
        max_buffer_size: int,
        min_bucket_size: int,
        prefix_match_threshold: float,
        load_balance_threshold: int,
        batch_size: int | None,
        engine_args: Any,
        generate_args: Any,
    ) -> PyExpr: ...

    ###
    # Visitor methods
    ###

    def accept(self, visitor: ExpressionVisitor[R]) -> R: ...

    ###
    # Helper methods from Expr from Eq Hash traits
    ###

    def _eq(self, other: PyExpr) -> bool: ...
    def _ne(self, other: PyExpr) -> bool: ...
    def _hash(self) -> int: ...

    ###
    # Conversion methods
    ###

    def as_py(self) -> Any: ...

    ###
    # Helper methods required by optimizer:
    # These should be removed from the Python API for Expressions when logical plans and optimizer are migrated to Rust
    ###
    def _input_mapping(self) -> builtins.str | None: ...

def eq(expr1: PyExpr, expr2: PyExpr) -> bool: ...
def unresolved_col(name: str) -> PyExpr: ...
def resolved_col(name: str) -> PyExpr: ...
def bound_col(index: int, field: PyField) -> PyExpr: ...
def lit(item: Any) -> PyExpr: ...
def list_(items: list[PyExpr]) -> PyExpr: ...
def date_lit(item: int) -> PyExpr: ...
def time_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def timestamp_lit(item: int, tu: PyTimeUnit, tz: str | None) -> PyExpr: ...
def duration_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def interval_lit(
    years: int | None,
    months: int | None,
    days: int | None,
    hours: int | None,
    minutes: int | None,
    seconds: int | None,
    millis: int | None,
    nanos: int | None,
) -> PyExpr: ...
def decimal_lit(sign: bool, digits: tuple[int, ...], exp: int) -> PyExpr: ...
def list_lit(item: PySeries) -> PyExpr: ...
def udf(
    name: str,
    inner: UninitializedUdf,
    bound_args: BoundUDFArgs,
    expressions: list[PyExpr],
    return_dtype: PyDataType,
    init_args: InitArgsType,
    resource_request: ResourceRequest | None,
    batch_size: int | None,
    concurrency: int | None,
    use_process: bool | None,
    ray_options: dict[str, Any] | None = None,
) -> PyExpr: ...
def row_wise_udf(
    name: str,
    cls: ClsBase[Any],
    method: Callable[Concatenate[Any, ...], Any],
    is_async: bool,
    return_dtype: PyDataType,
    gpus: int,
    use_process: bool | None,
    max_concurrency: int | None,
    max_retries: int | None,
    on_error: str | None,
    original_args: tuple[tuple[Any, ...], dict[str, Any]],
    expr_args: list[PyExpr],
) -> PyExpr: ...
def batch_udf(
    name: str,
    cls: ClsBase[Any],
    method: Callable[Concatenate[Any, ...], Any],
    is_async: bool,
    return_dtype: PyDataType,
    gpus: int,
    use_process: bool | None,
    max_concurrency: int | None,
    batch_size: int | None,
    max_retries: int | None,
    on_error: str | None,
    original_args: tuple[tuple[Any, ...], dict[str, Any]],
    expr_args: list[PyExpr],
) -> PyExpr: ...
def initialize_udfs(expression: PyExpr) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def row_number() -> PyExpr: ...
def rank() -> PyExpr: ...
def dense_rank() -> PyExpr: ...

# -----
# SQL functions
# -----
class SQLFunctionStub:
    @property
    def name(self) -> str: ...
    @property
    def docstring(self) -> str: ...
    @property
    def arg_names(self) -> list[str]: ...

def sql_exec(
    source: str, session: PySession, ctes: dict[str, LogicalPlanBuilder], config: PyDaftPlanningConfig
) -> object | None: ...
def sql_expr(sql: str) -> PyExpr: ...
def sql_datatype(sql: str) -> PyDataType: ...
def list_sql_functions() -> list[SQLFunctionStub]: ...
def connect_start(addr: str = "sc://0.0.0.0:0") -> ConnectionHandle: ...

class ConnectionHandle:
    def shutdown(self) -> None: ...
    def port(self) -> int: ...

# ---
# expr.list namespace
# ---
def list_sort(expr: PyExpr, desc: PyExpr, nulls_first: PyExpr) -> PyExpr: ...
def list_distinct(expr: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ...
def list_sum(expr: PyExpr) -> PyExpr: ...
def list_mean(expr: PyExpr) -> PyExpr: ...
def list_min(expr: PyExpr) -> PyExpr: ...
def list_max(expr: PyExpr) -> PyExpr: ...
def list_bool_and(expr: PyExpr) -> PyExpr: ...
def list_bool_or(expr: PyExpr) -> PyExpr: ...
def list_slice(expr: PyExpr, start: PyExpr, end: PyExpr | None = None) -> PyExpr: ...
def list_chunk(expr: PyExpr, size: int) -> PyExpr: ...

class PySeries:
    @staticmethod
    def from_arrow(name: str, pyarrow_array: pa.Array, dtype: PyDataType | None = None) -> PySeries: ...
    @staticmethod
    def from_pylist(pylist: list[Any], name: str | None = None, dtype: PyDataType | None = None) -> PySeries: ...
    def to_pylist(self) -> list[Any]: ...
    def to_arrow(self) -> pa.Array: ...
    def __iter__(self) -> PySeriesIterator: ...
    def __getitem__(self, index: int) -> Any: ...
    def __abs__(self) -> PySeries: ...
    def __add__(self, other: PySeries) -> PySeries: ...
    def __sub__(self, other: PySeries) -> PySeries: ...
    def __mul__(self, other: PySeries) -> PySeries: ...
    def __truediv__(self, other: PySeries) -> PySeries: ...
    def __mod__(self, other: PySeries) -> PySeries: ...
    def __and__(self, other: PySeries) -> PySeries: ...
    def __or__(self, other: PySeries) -> PySeries: ...
    def __xor__(self, other: PySeries) -> PySeries: ...
    def __lt__(self, other: PySeries) -> PySeries: ...
    def __le__(self, other: PySeries) -> PySeries: ...
    def __gt__(self, other: PySeries) -> PySeries: ...
    def __ge__(self, other: PySeries) -> PySeries: ...
    def __eq__(self, other: PySeries) -> PySeries: ...  # type: ignore[override]
    def __ne__(self, other: PySeries) -> PySeries: ...  # type: ignore[override]
    def __rshift__(self, other: PySeries) -> PySeries: ...
    def __lshift__(self, other: PySeries) -> PySeries: ...
    def __floordiv__(self, other: PySeries) -> PySeries: ...
    def take(self, idx: PySeries) -> PySeries: ...
    def slice(self, start: int, end: int) -> PySeries: ...
    def filter(self, mask: PySeries) -> PySeries: ...
    def sort(self, descending: bool, nulls_first: bool) -> PySeries: ...
    def argsort(self, descending: bool, nulls_first: bool) -> PySeries: ...
    def hash(self, seed: PySeries | None = None) -> PySeries: ...
    def minhash(
        self,
        num_hashes: int,
        ngram_size: int,
        seed: int = 1,
        hash_function: Literal["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"] = "murmurhash3",
    ) -> PySeries: ...
    def __invert__(self) -> PySeries: ...
    def count(self, mode: CountMode) -> PySeries: ...
    def count_distinct(self) -> PySeries: ...
    def sum(self) -> PySeries: ...
    def product(self) -> PySeries: ...
    def mean(self) -> PySeries: ...
    def stddev(self) -> PySeries: ...
    def min(self) -> PySeries: ...
    def max(self) -> PySeries: ...
    def agg_list(self) -> PySeries: ...
    def agg_set(self) -> PySeries: ...
    def cast(self, dtype: PyDataType) -> PySeries: ...
    def pow(self, exp: float) -> PySeries: ...
    def log2(self) -> PySeries: ...
    def log10(self) -> PySeries: ...
    def log(self, base: float) -> PySeries: ...
    def ln(self) -> PySeries: ...
    def log1p(self) -> PySeries: ...
    @staticmethod
    def concat(series: list[PySeries]) -> PySeries: ...
    def __len__(self) -> int: ...
    def size_bytes(self) -> int: ...
    def name(self) -> str: ...
    def rename(self, name: str) -> PySeries: ...
    def data_type(self) -> PyDataType: ...
    def partitioning_days(self) -> PySeries: ...
    def partitioning_hours(self) -> PySeries: ...
    def partitioning_months(self) -> PySeries: ...
    def partitioning_years(self) -> PySeries: ...
    def partitioning_iceberg_bucket(self, n: int) -> PySeries: ...
    def partitioning_iceberg_truncate(self, w: int) -> PySeries: ...
    def list_count(self, mode: CountMode) -> PySeries: ...
    def list_get(self, idx: PySeries, default: PySeries) -> PySeries: ...
    def list_slice(self, start: PySeries, end: PySeries | None = None) -> PySeries: ...
    def list_sort(self, desc: PySeries, nulls_first: PySeries) -> PySeries: ...
    def map_get(self, key: PySeries) -> PySeries: ...
    def if_else(self, other: PySeries, predicate: PySeries) -> PySeries: ...
    def is_null(self) -> PySeries: ...
    def not_null(self) -> PySeries: ...
    def fill_null(self, fill_value: PySeries) -> PySeries: ...
    def murmur3_32(self) -> PySeries: ...
    def to_str_values(self) -> PySeries: ...
    def _debug_bincode_serialize(self) -> bytes: ...
    @staticmethod
    def _debug_bincode_deserialize(b: bytes) -> PySeries: ...

class PySeriesIterator:
    def __next__(self) -> Any: ...
    def __iter__(self) -> PySeriesIterator: ...

class PyShowOptions:
    pass

class OperatorMetrics:
    def inc_counter(
        self,
        name: str,
        value: int,
        *,
        description: str | None = None,
        attributes: dict[str, str] | None = None,
    ) -> None: ...

class PyRecordBatch:
    def schema(self) -> PySchema: ...
    def eval_expression_list(self, exprs: list[PyExpr]) -> PyRecordBatch: ...
    def eval_expression_list_with_metrics(self, exprs: list[PyExpr]) -> tuple[PyRecordBatch, OperatorMetrics]: ...
    def take(self, idx: PySeries) -> PyRecordBatch: ...
    def filter(self, exprs: list[PyExpr]) -> PyRecordBatch: ...
    def sort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PyRecordBatch: ...
    def argsort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PySeries: ...
    def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyRecordBatch: ...
    def pivot(
        self,
        group_by: list[PyExpr],
        pivot_column: PyExpr,
        values_column: PyExpr,
        names: list[str],
    ) -> PyRecordBatch: ...
    def hash_join(
        self,
        right: PyRecordBatch,
        left_on: list[PyExpr],
        right_on: list[PyExpr],
        how: JoinType,
    ) -> PyRecordBatch: ...
    def sort_merge_join(
        self,
        right: PyRecordBatch,
        left_on: list[PyExpr],
        right_on: list[PyExpr],
        is_sorted: bool,
    ) -> PyRecordBatch: ...
    def explode(self, to_explode: list[PyExpr]) -> PyRecordBatch: ...
    def head(self, num: int) -> PyRecordBatch: ...
    def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyRecordBatch: ...
    def sample_by_size(self, size: int, with_replacement: bool, seed: int | None) -> PyRecordBatch: ...
    def quantiles(self, num: int) -> PyRecordBatch: ...
    def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyRecordBatch]: ...
    def partition_by_random(self, num_partitions: int, seed: int) -> list[PyRecordBatch]: ...
    def partition_by_range(
        self, partition_keys: list[PyExpr], boundaries: PyRecordBatch, descending: list[bool]
    ) -> list[PyRecordBatch]: ...
    def partition_by_value(self, partition_keys: list[PyExpr]) -> tuple[list[PyRecordBatch], PyRecordBatch]: ...
    def add_monotonically_increasing_id(self, partition_num: int, column_name: str) -> PyRecordBatch: ...
    def preview(self, format: str | None, options: str | None) -> str: ...
    def __repr__(self) -> str: ...
    def _repr_html_(self) -> str: ...
    def __len__(self) -> int: ...
    def size_bytes(self) -> int: ...
    def get_column(self, idx: int) -> PySeries: ...
    def columns(self) -> list[PySeries]: ...
    @staticmethod
    def concat(tables: list[PyRecordBatch]) -> PyRecordBatch: ...
    def slice(self, start: int, end: int) -> PyRecordBatch: ...
    @staticmethod
    def from_arrow_record_batches(record_batches: list[pa.RecordBatch], schema: PySchema) -> PyRecordBatch: ...
    @staticmethod
    def from_pylist_series(dict: dict[str, PySeries]) -> PyRecordBatch: ...
    @staticmethod
    def from_pyseries_list(list: list[PySeries]) -> PyRecordBatch: ...
    def to_arrow_record_batch(self) -> pa.RecordBatch: ...
    @staticmethod
    def empty(schema: PySchema | None = None) -> PyRecordBatch: ...
    @staticmethod
    def from_file_infos(file_infos: FileInfos) -> PyRecordBatch: ...
    def to_file_infos(self) -> FileInfos: ...
    @staticmethod
    def from_ipc_stream(bytes: bytes) -> PyRecordBatch: ...
    def to_ipc_stream(self) -> bytes: ...

class PyMicroPartition:
    def schema(self) -> PySchema: ...
    def column_names(self) -> list[str]: ...
    def get_column_by_name(self, name: str) -> PySeries: ...
    def get_column(self, idx: int) -> PySeries: ...
    def columns(self) -> list[PySeries]: ...
    def get_record_batches(self) -> list[PyRecordBatch]: ...
    def size_bytes(self) -> int | None: ...
    def _repr_html_(self) -> str: ...
    @staticmethod
    def empty(schema: PySchema | None = None) -> PyMicroPartition: ...
    @staticmethod
    def from_record_batches(record_batches: list[PyRecordBatch]) -> PyMicroPartition: ...
    @staticmethod
    def from_arrow_record_batches(record_batches: list[pa.RecordBatch], schema: PySchema) -> PyMicroPartition: ...
    @staticmethod
    def concat(tables: list[PyMicroPartition]) -> PyMicroPartition: ...
    @staticmethod
    def concat_or_empty(tables: list[PyMicroPartition], schema: PySchema) -> PyMicroPartition: ...
    @staticmethod
    def read_from_ipc_stream(bytes: bytes) -> PyMicroPartition: ...
    def write_to_ipc_stream(self) -> bytes: ...
    def slice(self, start: int, end: int) -> PyMicroPartition: ...
    def to_record_batch(self) -> PyRecordBatch: ...
    def cast_to_schema(self, schema: PySchema) -> PyMicroPartition: ...
    def eval_expression_list(self, exprs: list[PyExpr]) -> PyMicroPartition: ...
    def take(self, idx: PySeries) -> PyMicroPartition: ...
    def filter(self, exprs: list[PyExpr]) -> PyMicroPartition: ...
    def sort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PyMicroPartition: ...
    def argsort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PySeries: ...
    def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyMicroPartition: ...
    def dedup(self, columns: list[PyExpr]) -> PyMicroPartition: ...
    def hash_join(
        self,
        right: PyMicroPartition,
        left_on: list[PyExpr],
        right_on: list[PyExpr],
        how: JoinType,
        null_equals_nulls: list[bool] | None = None,
    ) -> PyMicroPartition: ...
    def pivot(
        self,
        group_by: list[PyExpr],
        pivot_column: PyExpr,
        values_column: PyExpr,
        names: list[str],
    ) -> PyMicroPartition: ...
    def sort_merge_join(
        self,
        right: PyMicroPartition,
        left_on: list[PyExpr],
        right_on: list[PyExpr],
        is_sorted: bool,
    ) -> PyMicroPartition: ...
    def cross_join(
        self,
        right: PyMicroPartition,
        outer_loop_side: JoinSide,
    ) -> PyMicroPartition: ...
    def explode(self, to_explode: list[PyExpr]) -> PyMicroPartition: ...
    def unpivot(
        self,
        ids: list[PyExpr],
        values: list[PyExpr],
        variable_name: str,
        value_name: str,
    ) -> PyMicroPartition: ...
    def head(self, num: int) -> PyMicroPartition: ...
    def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyMicroPartition: ...
    def sample_by_size(self, size: int, with_replacement: bool, seed: int | None) -> PyMicroPartition: ...
    def quantiles(self, num: int) -> PyMicroPartition: ...
    def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyMicroPartition]: ...
    def partition_by_random(self, num_partitions: int, seed: int) -> list[PyMicroPartition]: ...
    def partition_by_range(
        self, partition_keys: list[PyExpr], boundaries: PyRecordBatch, descending: list[bool]
    ) -> list[PyMicroPartition]: ...
    def partition_by_value(self, exprs: list[PyExpr]) -> tuple[list[PyMicroPartition], PyMicroPartition]: ...
    def add_monotonically_increasing_id(self, partition_num: int, column_name: str) -> PyMicroPartition: ...
    def __repr__(self) -> str: ...
    def __len__(self) -> int: ...
    @classmethod
    def read_parquet(
        cls,
        path: str,
        columns: list[str] | None = None,
        start_offset: int | None = None,
        num_rows: int | None = None,
        row_groups: list[int] | None = None,
        predicate: PyExpr | None = None,
        io_config: IOConfig | None = None,
        multithreaded_io: bool | None = None,
        coerce_int96_timestamp_unit: PyTimeUnit = PyTimeUnit.nanoseconds(),
    ) -> PyMicroPartition: ...
    @classmethod
    def read_csv(
        cls,
        uri: str,
        convert_options: CsvConvertOptions | None = None,
        parse_options: CsvParseOptions | None = None,
        read_options: CsvReadOptions | None = None,
        io_config: IOConfig | None = None,
        multithreaded_io: bool | None = None,
    ) -> PyMicroPartition: ...
    @classmethod
    def read_json_native(
        cls,
        uri: str,
        convert_options: JsonConvertOptions | None = None,
        parse_options: JsonParseOptions | None = None,
        read_options: JsonReadOptions | None = None,
        io_config: IOConfig | None = None,
        multithreaded_io: bool | None = None,
    ) -> PyMicroPartition: ...
    @classmethod
    def read_warc(
        cls,
        uri: str,
        io_config: IOConfig | None = None,
        multithreaded_io: bool | None = None,
    ) -> PyMicroPartition: ...

class PyMicroPartitionSet:
    def __init__(self) -> None: ...
    def get_partition(self, idx: int) -> PyMicroPartition: ...
    def set_partition(self, idx: int, part: PyMicroPartition) -> None: ...
    def delete_partition(self, idx: int) -> None: ...
    def has_partition(self, idx: int) -> bool: ...
    def __len__(self) -> int: ...
    def size_bytes(self) -> int | None: ...
    def num_partitions(self) -> int: ...
    def wait(self) -> None: ...
    def get_merged_micropartition(self) -> PyMicroPartition: ...
    def get_preview_micropartitions(self, num_rows: int) -> list[PyMicroPartition]: ...
    def items(self) -> list[tuple[int, PyMicroPartition]]: ...

class LogicalPlanBuilder:
    """A logical plan builder, which simplifies constructing logical plans via a fluent interface.

    E.g., LogicalPlanBuilder.table_scan(..).project(..).filter(..).

    This builder holds the current root (sink) of the logical plan, and the building methods return
    a brand new builder holding a new plan; i.e., this is an immutable builder.
    """

    @staticmethod
    def in_memory_scan(
        partition_key: str,
        cache_entry: PartitionCacheEntry,
        schema: PySchema,
        num_partitions: int,
        size_bytes: int,
        num_rows: int,
    ) -> LogicalPlanBuilder: ...
    @staticmethod
    def from_glob_scan(
        glob_paths: list[str],
        io_config: IOConfig | None = None,
    ) -> LogicalPlanBuilder: ...
    def with_planning_config(self, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
    def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ...
    def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ...
    def with_columns_renamed(self, cols_map: dict[str, str]) -> LogicalPlanBuilder: ...
    def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ...
    def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
    def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
    def offset(self, offset: int) -> LogicalPlanBuilder: ...
    def shard(self, strategy: str, world_size: int, rank: int) -> LogicalPlanBuilder: ...
    def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
    def unpivot(
        self,
        ids: list[PyExpr],
        values: list[PyExpr],
        variable_name: str,
        value_name: str,
    ) -> LogicalPlanBuilder: ...
    def sort(self, sort_by: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> LogicalPlanBuilder: ...
    def hash_repartition(
        self,
        partition_by: list[PyExpr],
        num_partitions: int | None,
    ) -> LogicalPlanBuilder: ...
    def random_shuffle(self, num_partitions: int | None) -> LogicalPlanBuilder: ...
    def into_partitions(self, num_partitions: int) -> LogicalPlanBuilder: ...
    def into_batches(self, batch_size: int) -> LogicalPlanBuilder: ...
    def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ...
    def distinct(self, on: list[PyExpr]) -> LogicalPlanBuilder: ...
    def sample(
        self, fraction: float | None, size: int | None, with_replacement: bool, seed: int | None
    ) -> LogicalPlanBuilder: ...
    def aggregate(self, agg_exprs: list[PyExpr], groupby_exprs: list[PyExpr]) -> LogicalPlanBuilder: ...
    def pivot(
        self,
        groupby_exprs: list[PyExpr],
        pivot_expr: PyExpr,
        values_expr: PyExpr,
        agg_expr: PyExpr,
        names: list[str],
    ) -> LogicalPlanBuilder: ...
    def join(
        self,
        right: LogicalPlanBuilder,
        left_on: list[PyExpr],
        right_on: list[PyExpr],
        join_type: JoinType,
        join_strategy: JoinStrategy | None = None,
        prefix: str | None = None,
        suffix: str | None = None,
    ) -> LogicalPlanBuilder: ...
    def concat(self, other: LogicalPlanBuilder) -> LogicalPlanBuilder: ...
    def union(self, other: LogicalPlanBuilder, is_all: bool, is_by_name: bool) -> LogicalPlanBuilder: ...
    def intersect(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
    def except_(self, other: LogicalPlanBuilder, is_all: bool) -> LogicalPlanBuilder: ...
    def add_monotonically_increasing_id(self, column_name: str | None) -> LogicalPlanBuilder: ...
    def table_write(
        self,
        root_dir: str,
        write_mode: WriteMode,
        file_format: FileFormat,
        partition_cols: list[PyExpr] | None = None,
        compression: str | None = None,
        io_config: IOConfig | None = None,
    ) -> LogicalPlanBuilder: ...
    def iceberg_write(
        self,
        table_name: str,
        table_location: str,
        partition_spec_id: int,
        partition_cols: list[PyExpr],
        iceberg_schema: IcebergSchema,
        iceberg_properties: IcebergTableProperties,
        catalog_columns: list[str],
        io_config: IOConfig | None = None,
    ) -> LogicalPlanBuilder: ...
    def delta_write(
        self,
        path: str,
        columns_name: list[str],
        mode: str,
        version: int,
        large_dtypes: bool,
        partition_cols: list[str] | None = None,
        io_config: IOConfig | None = None,
    ) -> LogicalPlanBuilder: ...
    def lance_write(
        self,
        path: str,
        columns_name: list[str],
        mode: str,
        io_config: IOConfig | None = None,
        kwargs: dict[str, Any] | None = None,
    ) -> LogicalPlanBuilder: ...
    def datasink_write(self, name: str, sink: DataSink[WriteResultType]) -> LogicalPlanBuilder: ...
    def schema(self) -> PySchema: ...
    def describe(self) -> LogicalPlanBuilder: ...
    def summarize(self) -> LogicalPlanBuilder: ...
    def optimize(self, execution_config: PyDaftExecutionConfig) -> LogicalPlanBuilder: ...
    def repr_ascii(self, simple: bool) -> str: ...
    def repr_mermaid(self, options: MermaidOptions) -> str: ...
    def repr_json(self, include_schema: bool) -> str: ...

class DistributedPhysicalPlan:
    @staticmethod
    def from_logical_plan_builder(
        builder: LogicalPlanBuilder, query_id: str, config: PyDaftExecutionConfig
    ) -> DistributedPhysicalPlan: ...
    def idx(self) -> str: ...
    def num_partitions(self) -> int: ...
    def repr_ascii(self, simple: bool) -> str: ...
    def repr_mermaid(self, options: MermaidOptions) -> str: ...

class DistributedPhysicalPlanRunner:
    def __init__(self) -> None: ...
    def run_plan(
        self, plan: DistributedPhysicalPlan, psets: dict[str, list[RayPartitionRef]]
    ) -> AsyncIterator[RayPartitionRef]: ...

class LocalPhysicalPlan:
    @staticmethod
    def from_logical_plan_builder(builder: LogicalPlanBuilder) -> LocalPhysicalPlan: ...

class RayPartitionRef:
    object_ref: ray.ObjectRef
    num_rows: int
    size_bytes: int

    def __init__(self, object_ref: ray.ObjectRef, num_rows: int, size_bytes: int): ...

class RaySwordfishTask:
    def name(self) -> str: ...
    def plan(self) -> LocalPhysicalPlan: ...
    def psets(self) -> dict[str, list[RayPartitionRef]]: ...
    def config(self) -> PyDaftExecutionConfig: ...
    def context(self) -> dict[str, str]: ...

class RayTaskResult:
    @staticmethod
    def success(ray_part_refs: list[RayPartitionRef], stats: bytes) -> RayTaskResult: ...
    @staticmethod
    def worker_died() -> RayTaskResult: ...
    @staticmethod
    def worker_unavailable() -> RayTaskResult: ...

class RaySwordfishWorker:
    def __init__(
        self,
        worker_id: str,
        actor_handle: RaySwordfishActorHandle,
        num_cpus: int,
        num_gpus: int,
        total_memory_bytes: int,
    ) -> None: ...

class PyExecutionEngineResult:
    def __aiter__(self) -> PyExecutionEngineResult: ...
    async def __anext__(self) -> PyMicroPartition | None: ...
    async def finish(self) -> bytes: ...

class NativeExecutor:
    def __init__(self) -> None: ...
    def run(
        self,
        plan: LocalPhysicalPlan,
        psets: dict[str, list[PyMicroPartition]],
        daft_ctx: PyDaftContext,
        results_buffer_size: int | None = None,
        context: dict[str, str] | None = None,
    ) -> PyExecutionEngineResult: ...
    @staticmethod
    def repr_ascii(builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, simple: bool) -> str: ...
    @staticmethod
    def repr_mermaid(
        builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, options: MermaidOptions
    ) -> str: ...
    @staticmethod
    def get_relationship_info(
        logical_plan_builder: LogicalPlanBuilder,
        daft_execution_config: PyDaftExecutionConfig,
    ) -> RelationshipInformation: ...

class RelationshipInformation:
    ids: list[RelationshipNode]
    plan_id: str

class RelationshipNode:
    id: int
    parent_id: int | None

class PyDaftExecutionConfig:
    @staticmethod
    def from_env() -> PyDaftExecutionConfig: ...
    def with_config_values(
        self,
        enable_scan_task_split_and_merge: bool | None = None,
        scan_tasks_min_size_bytes: int | None = None,
        scan_tasks_max_size_bytes: int | None = None,
        max_sources_per_scan_task: int | None = None,
        parquet_split_row_groups_max_files: int | None = None,
        broadcast_join_size_bytes_threshold: int | None = None,
        hash_join_partition_size_leniency: float | None = None,
        sample_size_for_sort: int | None = None,
        num_preview_rows: int | None = None,
        parquet_target_filesize: int | None = None,
        parquet_target_row_group_size: int | None = None,
        parquet_inflation_factor: float | None = None,
        csv_target_filesize: int | None = None,
        csv_inflation_factor: float | None = None,
        json_inflation_factor: float | None = None,
        shuffle_aggregation_default_partitions: int | None = None,
        partial_aggregation_threshold: int | None = None,
        high_cardinality_aggregation_threshold: float | None = None,
        read_sql_partition_size_bytes: int | None = None,
        default_morsel_size: int | None = None,
        shuffle_algorithm: str | None = None,
        pre_shuffle_merge_threshold: int | None = None,
        scantask_max_parallel: int | None = None,
        native_parquet_writer: bool | None = None,
        min_cpu_per_task: float | None = None,
        actor_udf_ready_timeout: int | None = None,
        maintain_order: bool | None = None,
        enable_dynamic_batching: bool | None = None,
        dynamic_batching_strategy: str | None = None,
    ) -> PyDaftExecutionConfig: ...
    @property
    def enable_scan_task_split_and_merge(self) -> bool: ...
    @property
    def scan_tasks_min_size_bytes(self) -> int: ...
    @property
    def scan_tasks_max_size_bytes(self) -> int: ...
    @property
    def max_sources_per_scan_task(self) -> int: ...
    @property
    def parquet_split_row_groups_max_files(self) -> int: ...
    @property
    def broadcast_join_size_bytes_threshold(self) -> int: ...
    @property
    def hash_join_partition_size_leniency(self) -> float: ...
    @property
    def sample_size_for_sort(self) -> int: ...
    @property
    def num_preview_rows(self) -> int: ...
    @property
    def parquet_target_filesize(self) -> int: ...
    @property
    def parquet_target_row_group_size(self) -> int: ...
    @property
    def parquet_inflation_factor(self) -> float: ...
    @property
    def csv_target_filesize(self) -> int: ...
    @property
    def csv_inflation_factor(self) -> float: ...
    @property
    def json_inflation_factor(self) -> float: ...
    @property
    def shuffle_aggregation_default_partitions(self) -> int: ...
    @property
    def partial_aggregation_threshold(self) -> int: ...
    @property
    def high_cardinality_aggregation_threshold(self) -> float: ...
    @property
    def read_sql_partition_size_bytes(self) -> int: ...
    @property
    def default_morsel_size(self) -> int: ...
    @property
    def shuffle_algorithm(self) -> str: ...
    @property
    def pre_shuffle_merge_threshold(self) -> int: ...
    @property
    def flight_shuffle_dirs(self) -> list[str]: ...
    @property
    def min_cpu_per_task(self) -> float: ...
    @property
    def actor_udf_ready_timeout(self) -> int: ...
    @property
    def scantask_max_parallel(self) -> int: ...
    @property
    def enable_dynamic_batching(self) -> bool: ...
    @property
    def dynamic_batching_strategy(self) -> str: ...

class PyDaftPlanningConfig:
    @staticmethod
    def from_env() -> PyDaftPlanningConfig: ...
    def with_config_values(
        self,
        default_io_config: IOConfig | None = None,
        enable_strict_filter_pushdown: bool | None = None,
    ) -> PyDaftPlanningConfig: ...
    @property
    def default_io_config(self) -> IOConfig: ...
    @property
    def enable_strict_filter_pushdown(self) -> bool: ...

class StatType(Enum):
    COUNT = 0
    BYTES = 1
    PERCENT = 2
    FLOAT = 3
    DURATION = 5

# TODO(void001): Implement Dead state
class QueryEndState(Enum):
    Finished = 0
    Canceled = 1
    Failed = 2
    Dead = 3

class PyQueryMetadata:
    output_schema: PySchema
    unoptimized_plan: str

    def __init__(self, output_schema: PySchema, unoptimized_plan: str) -> None: ...

class PyQueryResult:
    end_state: QueryEndState
    error_message: str | None

    def __init__(self, end_state: QueryEndState, error_message: str | None) -> None: ...

class PyDaftContext:
    def __init__(self) -> None: ...

    _daft_execution_config: PyDaftExecutionConfig
    _daft_planning_config: PyDaftPlanningConfig

    @property
    def daft_execution_config(self) -> PyDaftExecutionConfig: ...
    @property
    def daft_planning_config(self) -> PyDaftPlanningConfig: ...

    # Subscriber methods
    def attach_subscriber(self, alias: str, subscriber: Subscriber) -> None: ...
    def detach_subscriber(self, alias: str) -> None: ...
    def notify_query_start(self, query_id: str, metadata: PyQueryMetadata) -> None: ...
    def notify_query_end(self, query_id: str, query_result: PyQueryResult) -> None: ...
    def notify_result_out(self, query_id: str, result: PartitionT) -> None: ...
    def notify_optimization_start(self, query_id: str) -> None: ...
    def notify_optimization_end(self, query_id: str, optimized_plan: str) -> None: ...

def set_runner_ray(
    address: str | None = None,
    noop_if_initialized: bool = False,
    max_task_backlog: int | None = None,
    force_client_mode: bool = False,
) -> Runner[PartitionT]: ...
def set_runner_native(num_threads: int | None = None) -> Runner[PartitionT]: ...
def get_or_create_runner() -> Runner[PartitionT]: ...
def get_or_infer_runner_type() -> str: ...
def get_runner() -> Runner[PartitionT] | None: ...
def get_context() -> PyDaftContext: ...
def build_type() -> str: ...
def version() -> str: ...
def refresh_logger() -> None: ...
def get_max_log_level() -> str: ...
def set_compute_runtime_num_worker_threads(num_threads: int) -> None: ...
def io_glob(
    path: str,
    multithreaded_io: bool | None = None,
    io_config: IOConfig | None = None,
    fanout_limit: int | None = None,
    page_size: int | None = None,
    limit: int | None = None,
) -> list[dict[str, Any]]: ...

class SystemInfo:
    """Accessor for system information."""

    def __init__(self) -> None: ...
    def total_memory(self) -> int: ...
    def cpu_count(self) -> int | None: ...

###
# daft-catalog
###

class PyCatalog:
    def name(self) -> str: ...
    def create_namespace(self, ident: PyIdentifier) -> None: ...
    def create_table(self, ident: PyIdentifier, schema: PySchema) -> Table: ...
    def drop_namespace(self, ident: PyIdentifier) -> None: ...
    def drop_table(self, ident: PyIdentifier) -> None: ...
    def get_table(self, ident: PyIdentifier) -> Table: ...
    def has_table(self, ident: PyIdentifier) -> bool: ...
    def has_namespace(self, ident: PyIdentifier) -> bool: ...
    def list_namespaces(self, pattern: str | None = None) -> list[PyIdentifier]: ...
    def list_tables(self, pattern: str | None = None) -> list[PyIdentifier]: ...
    @staticmethod
    def new_memory_catalog(name: str) -> Catalog: ...

class PyTable:
    def name(self) -> str: ...
    def schema(self) -> PySchema: ...
    def to_logical_plan(self) -> LogicalPlanBuilder: ...
    def append(self, plan: LogicalPlanBuilder, **options: Any) -> None: ...
    def overwrite(self, plan: LogicalPlanBuilder, **options: Any) -> None: ...
    @staticmethod
    def new_memory_table(name: str, schema: PySchema) -> Table: ...

class PyIdentifier:
    def __init__(self, parts: tuple[str, ...]): ...
    @staticmethod
    def from_sql(input: str, normalize: bool) -> PyIdentifier: ...
    def eq(self, other: PyIdentifier) -> bool: ...
    def getitem(self, index: int) -> str: ...
    def __len__(self) -> int: ...
    def __repr__(self) -> str: ...
    def __hash__(self) -> int: ...

class PyTableSource:
    @staticmethod
    def from_pyschema(schema: PySchema) -> PyTableSource: ...
    @staticmethod
    def from_pybuilder(builder: LogicalPlanBuilder) -> PyTableSource: ...

###
# daft-session
###

class PySession:
    def __init__(self) -> None: ...
    @staticmethod
    def empty() -> PySession: ...
    def attach_catalog(self, catalog: Catalog, alias: str) -> None: ...
    def attach_function(self, function: UDF, alias: str | None = None) -> None: ...
    def attach_provider(self, provider: Provider, alias: str) -> None: ...
    def attach_table(self, table: Table, alias: str) -> None: ...
    def detach_catalog(self, alias: str) -> None: ...
    def detach_function(self, alias: str) -> None: ...
    def detach_provider(self, alias: str) -> None: ...
    def detach_table(self, alias: str) -> None: ...
    def create_temp_table(self, ident: str, source: PyTableSource, replace: bool) -> Table: ...
    def current_catalog(self) -> Catalog | None: ...
    def current_namespace(self) -> PyIdentifier | None: ...
    def current_provider(self) -> Provider | None: ...
    def current_model(self) -> str | None: ...
    def get_catalog(self, ident: str) -> Catalog: ...
    def get_provider(self, ident: str) -> Provider: ...
    def get_table(self, ident: PyIdentifier) -> Table: ...
    def has_catalog(self, ident: str) -> bool: ...
    def has_provider(self, ident: str) -> bool: ...
    def has_table(self, ident: PyIdentifier) -> bool: ...
    def list_catalogs(self, pattern: str | None = None) -> list[str]: ...
    def list_tables(self, pattern: str | None = None) -> list[PyIdentifier]: ...
    def set_catalog(self, ident: str | None) -> None: ...
    def set_namespace(self, ident: PyIdentifier | None) -> None: ...
    def set_provider(self, ident: str | None) -> None: ...
    def set_model(self, ident: str | None) -> None: ...

class InProgressShuffleCache:
    @staticmethod
    def try_new(
        num_partitions: int,
        dirs: list[str],
        node_id: str,
        shuffle_stage_id: int,
        target_filesize: int,
        compression: str | None = None,
        partition_by: list[PyExpr] | None = None,
    ) -> InProgressShuffleCache: ...
    async def push_partitions(self, input_partitions: list[PyMicroPartition]) -> None: ...
    async def close(self) -> ShuffleCache: ...

class ShuffleCache:
    def schema(self) -> PySchema: ...
    def file_paths_for_partition(self, partition_idx: int) -> list[str]: ...
    def bytes_per_file_for_partition(self, partition_idx: int) -> list[int]: ...
    def rows_per_partition(self) -> list[int]: ...
    def bytes_per_partition(self) -> list[int]: ...
    def clear_partition(self, partition_idx: int) -> None: ...
    def clear_directories(self) -> None: ...

class FlightServerConnectionHandle:
    def shutdown(self) -> None: ...
    def port(self) -> int: ...

def start_flight_server(
    shuffle_cache: ShuffleCache,
    ip: str,
) -> FlightServerConnectionHandle: ...

class FlightClientManager:
    def __init__(self, addresses: list[str], num_parallel_fetches: int, schema: PySchema): ...
    async def fetch_partition(self, partition: int) -> PyMicroPartition: ...

def cli(args: list[str]) -> None: ...

class PyScalarFunction:
    def __call__(self, *args: PyExpr, **kwargs: PyExpr) -> PyExpr: ...

def get_function_from_registry(name: str) -> PyScalarFunction: ...
def to_from_proto(builder: LogicalPlanBuilder) -> LogicalPlanBuilder: ...

class PyFileReference:
    @staticmethod
    def _from_tuple(tuple: tuple[Any]) -> PyFileReference: ...
    def __enter__(self) -> PyDaftFile: ...
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
    def seekable(self) -> bool: ...
    def readable(self) -> bool: ...
    def isatty(self) -> bool: ...
    def writable(self) -> bool: ...

class PyDaftFile:
    def __init__(self, path: str | None = None, data: bytes | None = None) -> None: ...
    def read(self, size: int | None = None) -> bytes: ...
    def seek(self, offset: int, whence: int = 0) -> int: ...
    def tell(self) -> int: ...
    def close(self) -> None: ...
    def __str__(self) -> str: ...
    def closed(self) -> bool: ...
    def _supports_range_requests(self) -> bool: ...
    def size(self) -> int: ...
    def guess_mime_type(self) -> str | None: ...
    @staticmethod
    def _from_file_reference(ref: PyFileReference) -> PyDaftFile: ...
    def __enter__(self) -> PyDaftFile: ...
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...

class PyMediaType:
    @staticmethod
    def unknown() -> PyMediaType: ...
    @staticmethod
    def video() -> PyMediaType: ...
    @staticmethod
    def audio() -> PyMediaType: ...

def guess_mimetype_from_content(bytes: bytes) -> str | None: ...
